package com.example.demo.stream;

import cn.hutool.json.JSONUtil;
import com.example.demo.entity.CommonConstants;
import com.example.demo.entity.Event;
import com.example.demo.stream.deal.CountEventAggregate;
import com.example.demo.util.PropertyUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaSourceTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 添加kafka配置文件
        Properties properties = PropertyUtil.loadProperty();
        // 构建kafka数据源
        KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
                // 设置kafka消息序列化方式
                .setValueOnlyDeserializer(new SimpleStringSchema(StandardCharsets.UTF_8))
                // 设置连接的topic名称
                .setTopics(CommonConstants.KAFKA_TOPIC)
                // 设置properties属性
                .setProperties(properties)
                // 设置偏移量为 最新发送的消息
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        // 数据源转换
        DataStreamSource<String> stream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // 数据源转换
        SingleOutputStreamOperator<Event> eventStream = stream.map(item -> JSONUtil.toBean(item, Event.class));
        // 数据源分组
        KeyedStream<Event, Boolean> employeePoBooleanKeyedStream = eventStream.keyBy(Objects::nonNull);
        // 增加水位线
        employeePoBooleanKeyedStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> {
                    //默认是毫秒
                    return element.timestamp;
                }));
        // 滑动事件窗口
//        employeePoBooleanKeyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
        // 滚动事件窗口 固定时间内数据生成窗口 进行计算
        employeePoBooleanKeyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(5000)))
                // 会话时间窗口
//        employeePoBooleanKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                // 计算统计
                .aggregate(new CountEventAggregate())
                .print();
        StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("D:\\java\\sink.csv"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy( // 滚动策略
                        DefaultRollingPolicy.builder()
                                // 文件大小
                                .withMaxPartSize(1024 * 1024 * 1024)
                                // 时间间隔
                                .withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
                                // 隔五分钟数据没有来,开始准备新的文件
                                .withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
                                .build()
                )
                .build();
        // 数据写出
        stream.addSink(streamingFileSink);
        env.execute();
    }

}
